package com.flink.examples.process;

import com.flink.examples.DataSource;
import com.google.gson.Gson;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Date;
import java.util.List;

/**
 * @Description KeyedProcessFunction方法：是一个基于KeyedStream流处理方法，可以灵活的对每个输入KeyedStream进行逻辑处理，并支持定时器时间事件
 * @Author JL
 * @Date 2020/09/15
 * @Version V1.0
 */
public class KeyedProcess {

    /**
     * 遍历集合，分别打印不同性别的信息，对于执行超时，自动触发定时器
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用event time，需要指定事件的时间戳
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        DataStream<String> dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //处理每keyBy后的每一个数据流，process方法通常应用于KeyedStream类型的数据流处理
                .process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, String>() {
                    //可以缓存同一个keyBy下的上下文状态
                    private ValueState<String> name ;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        //初始化自定义上下文，并指定类型为String
                        name = getRuntimeContext().getState(new ValueStateDescriptor<String>("name", String.class));
                    }
                    /**
                     *  处理每一个输入的数据流
                     *   ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。
                     *   每在输入流中接收到一个事件，就会调用来此函数来处理。
                     *   对于容错的状态，ProcessFunction 可以通过 RuntimeContext 访问 KeyedState，类似于其他有状态函数访问 KeyedState。
                     * @param value     数据流对象
                     * @param ctx       上下文信息
                     * @param out       输出结果的集合
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple3<String, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        //模拟超时，如果王五，则暂停5s
                        if (value.f0.equals("王五")){
                            name.update(value.f0);
                            //注册ProcessingTime处理时间的定时器,当时间达到指定值时触发onTimer事件，如：当前元素因3s内处理完毕，超时则触发onTimer方法
                            ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + (3 * 1000));
                            //模拟5s超时
                            Thread.sleep(5 * 1000);
                        }
                        out.collect(new Gson().toJson(value));
                    }
                    /**
                     * 注册的定时器触发时调用
                     *  Timers 定时器可以对处理时间和事件时间的变化做一些处理。
                     *  每次调用 processElement() 都可以获得一个 Context 对象，通过该对象可以访问元素的事件时间戳以及 TimerService。
                     *  TimerService 可以为尚未发生的事件时间/处理时间实注册回调。
                     *  当定时器到达某个时刻时，会调用 onTimer() 方法。
                     *  在调用期间，所有状态再次限定为定时器创建的键，允许定时器操作 KeyedState。
                     * @param timestamp 定时器所设定的触发的时间戳
                     * @param ctx       上下文信息
                     * @param out       输出结果的集合
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String timeStr = DateFormatUtils.format(new Date(timestamp), "yyyy-MM-dd HH:mm:ss");
                        out.collect(ctx.getCurrentKey() + ",用户："+ name.value() + ",当前任务处理超时：" + timeStr);
                        //删除定时器触发时间
                        ctx.timerService().deleteProcessingTimeTimer(timestamp);
                        name.update(null);
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }
}
/*
{"f0":"张三","f1":"man","f2":20}
{"f0":"李四","f1":"girl","f2":24}
{"f0":"王五","f1":"man","f2":29}
man,用户：王五,当前任务处理超时：2020-09-16 11:17:15
{"f0":"刘六","f1":"girl","f2":32}
{"f0":"伍七","f1":"girl","f2":18}
{"f0":"吴八","f1":"man","f2":30}
*/